package com.gagakuai.app.flink.watermark;

import com.gagakuai.domain.facts.Fact;
import com.gagakuai.infrastructure.utils.DateUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

import java.time.LocalDateTime;

/**
 * @description: 自定义水印时间戳生成器
 * @author: houhong
 * @create: 2024-09-07 20:29
 **/
public class IndexTimestampAssigner implements SerializableTimestampAssigner<Fact<?>> {

    /*
    * 提取事件流的event_time字段
    */
    @Override
    public long extractTimestamp(Fact<?> fact, long l) {

        //提取时间字段, 并转换时间戳，时间戳是毫秒。
        LocalDateTime localDateTime = DateUtil.convertStr2LocalDateTime((String)fact.getValue());
        return DateUtil.convertLocalDateTime2Timestamp(localDateTime);

    }
}
